Apache Flink এ Streaming SQL এবং Batch SQL ব্যবহার করে ডেটা স্ট্রিম এবং ব্যাচ ডেটাসেট উভয়ই প্রসেস করা যায়। Flink SQL এর মাধ্যমে ডেভেলপাররা রিয়েল-টাইম ডেটা স্ট্রিম এবং ঐতিহ্যবাহী ব্যাচ ডেটা প্রসেসিং-এর উপর SQL কিউরি চালাতে পারেন। Flink এর SQL API স্ট্রিম এবং ব্যাচ উভয় প্রক্রিয়ার জন্য একটি ইউনিফাইড ইন্টারফেস প্রদান করে, যা ডেটা প্রসেসিং সহজ এবং শক্তিশালী করে তোলে।
Flink এ Streaming SQL হল রিয়েল-টাইম স্ট্রিম ডেটা প্রসেসিং করার জন্য একটি শক্তিশালী টুল। এটি স্ট্রিম ডেটাকে Table হিসেবে উপস্থাপন করে এবং SQL কিউরির মাধ্যমে ডেটার উপর বিভিন্ন ট্রান্সফর্মেশন ও এনালাইসিস করা যায়।
StreamTableEnvironment
তৈরি করতে হবে।// 1. Create StreamTableEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 2. Register a Kafka Source Table
tableEnv.executeSql(
"CREATE TABLE orders (" +
" order_id STRING, " +
" product_id STRING, " +
" quantity INT, " +
" order_time TIMESTAMP(3), " +
" WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND" +
") WITH (" +
" 'connector' = 'kafka', " +
" 'topic' = 'orders', " +
" 'properties.bootstrap.servers' = 'localhost:9092', " +
" 'format' = 'json'" +
")"
);
// 3. Execute a Streaming SQL Query
Table result = tableEnv.sqlQuery(
"SELECT product_id, SUM(quantity) AS total_quantity " +
"FROM orders " +
"GROUP BY product_id, TUMBLE(order_time, INTERVAL '1' HOUR)"
);
// 4. Register a Sink Table and write results
tableEnv.executeSql(
"CREATE TABLE result_sink (" +
" product_id STRING, " +
" total_quantity BIGINT" +
") WITH (" +
" 'connector' = 'print'" +
")"
);
result.executeInsert("result_sink");
এই উদাহরণে:
orders
নামে একটি সোর্স টেবিল রেজিস্টার করা হয়েছে।product_id
এর মোট quantity
গণনা করা হচ্ছে।result_sink
টেবিলে লেখা হচ্ছে।Batch SQL Flink এ ঐতিহ্যবাহী ব্যাচ ডেটা প্রসেসিং করার জন্য ব্যবহৃত হয়। Flink ব্যাচ ডেটাসেটের উপর SQL কিউরি চালাতে পারে এবং প্রয়োজনীয় ট্রান্সফর্মেশন করতে পারে। Flink এর SQL API ব্যাচ প্রসেসিংয়ের জন্যও একই ইন্টারফেস ব্যবহার করে, যা ইউনিফাইড ডেটা প্রসেসিং সিস্টেম তৈরি করে।
TableEnvironment
তৈরি করতে হবে।// 1. Create TableEnvironment
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// 2. Register a File Source Table
tableEnv.executeSql(
"CREATE TABLE sales (" +
" sale_id STRING, " +
" product_id STRING, " +
" quantity INT, " +
" sale_date DATE" +
") WITH (" +
" 'connector' = 'filesystem', " +
" 'path' = 'file:///path/to/sales.csv', " +
" 'format' = 'csv'" +
")"
);
// 3. Execute a Batch SQL Query
Table result = tableEnv.sqlQuery(
"SELECT product_id, SUM(quantity) AS total_quantity " +
"FROM sales " +
"GROUP BY product_id"
);
// 4. Register a Sink Table and write results
tableEnv.executeSql(
"CREATE TABLE result_sink (" +
" product_id STRING, " +
" total_quantity BIGINT" +
") WITH (" +
" 'connector' = 'print'" +
")"
);
result.executeInsert("result_sink");
এই উদাহরণে:
sales
নামে একটি ফাইল সোর্স টেবিল রেজিস্টার করা হয়েছে যা একটি CSV ফাইল থেকে ডেটা পড়ছে।product_id
এর মোট quantity
গণনা করা হয়েছে।result_sink
টেবিলে লেখা হচ্ছে।বৈশিষ্ট্য | Streaming SQL | Batch SQL |
---|---|---|
ডেটা প্রসেসিং | ক্রমাগত এবং রিয়েল-টাইম ডেটা প্রসেসিং | ঐতিহ্যবাহী ব্যাচ ডেটা প্রসেসিং |
SQL ফাংশন | উইন্ডো ফাংশন, অ্যাগ্রিগেশন, টাম্বলিং উইন্ডো, সেশন উইন্ডো | স্ট্যান্ডার্ড SQL ফাংশন এবং অ্যাগ্রিগেশন |
টেবিল টাইপ | স্ট্রিম টেবিল (অবিরত পরিবর্তিত হয়) | স্থায়ী টেবিল (একবার লোড হয়ে স্থির থাকে) |
ইউস কেস | রিয়েল-টাইম অ্যানালাইসিস, ইভেন্ট প্রসেসিং | ঐতিহ্যবাহী ব্যাচ ডেটা এনালাইসিস, রিপোর্টিং |
Flink এ Streaming এবং Batch SQL এর মাধ্যমে ডেভেলপাররা সহজে ডেটা প্রসেসিং করতে পারেন এবং ডেটার উপর দ্রুত ও কার্যকরীভাবে কিউরি চালাতে পারেন। এটি বড় আকারের ডেটা প্রসেসিং এবং রিয়েল-টাইম ডেটা এনালাইসিসের জন্য একটি শক্তিশালী টুল।
আরও দেখুন...